{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Kinesis Data Stream\n",
    "* https://github.com/aws-samples/aws-ml-data-lake-workshop\n",
    "* https://aws.amazon.com/blogs/big-data/snakes-in-the-stream-feeding-and-eating-amazon-kinesis-streams-with-python/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "import sagemaker\n",
    "import pandas as pd\n",
    "\n",
    "sess   = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "role = sagemaker.get_execution_role()\n",
    "region = boto3.Session().region_name\n",
    "\n",
    "sm = boto3.Session().client(service_name='sagemaker', region_name=region)\n",
    "kn_data = boto3.Session().client(service_name='kinesis', region_name=region)\n",
    "kn_firehose = boto3.Session().client(service_name='firehose', region_name=region)\n",
    "sts = boto3.Session().client(service_name='sts', region_name=region)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "kn_data.list_streams()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 1: Create a Kinesis Data Stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "stream_name = \"dsoaws-data-stream\"\n",
    "shard_count = 2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = kn_data.create_stream(\n",
    "    StreamName=stream_name, \n",
    "    ShardCount=shard_count\n",
    ")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(response)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data_stream_response = kn_data.describe_stream(\n",
    "    StreamName=stream_name\n",
    ")\n",
    "\n",
    "print(data_stream_response)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data_stream_arn = data_stream_response['StreamDescription']['StreamARN']\n",
    "print(data_stream_arn)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "iam_kinesis_role_name = 'DSOAWS_Kinesis'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "assume_role_policy_doc = {\n",
    "  \"Version\": \"2012-10-17\",\n",
    "  \"Statement\": [\n",
    "    {\n",
    "      \"Effect\": \"Allow\",\n",
    "      \"Principal\": {\n",
    "        \"Service\": \"kinesis.amazonaws.com\"\n",
    "      },\n",
    "      \"Action\": \"sts:AssumeRole\"\n",
    "    },\n",
    "    {\n",
    "      \"Effect\": \"Allow\",\n",
    "      \"Principal\": {\n",
    "        \"Service\": \"firehose.amazonaws.com\"\n",
    "      },\n",
    "      \"Action\": \"sts:AssumeRole\"\n",
    "    }      \n",
    "  ]\n",
    "} "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "import boto3\n",
    "import time\n",
    "\n",
    "from botocore.exceptions import ClientError\n",
    "\n",
    "try:\n",
    "    iam = boto3.client('iam')\n",
    "\n",
    "    iam_role_kinesis = iam.create_role(\n",
    "        RoleName=iam_kinesis_role_name,\n",
    "        AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc),\n",
    "        Description='DSOAWS Kinesis Role'\n",
    "    )\n",
    "except ClientError as e:\n",
    "    if e.response['Error']['Code'] == 'EntityAlreadyExists':\n",
    "        iam_role_kinesis = iam.get_role(RoleName=iam_kinesis_role_name)\n",
    "        print(\"Role already exists\")\n",
    "    else:\n",
    "        print(\"Unexpected error: %s\" % e)\n",
    "        \n",
    "time.sleep(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "iam_role_kinesis_arn = iam_role_kinesis['Role']['Arn']\n",
    "print(iam_role_kinesis_arn)\n",
    "\n",
    "iam_role_kinesis_name = iam_role_kinesis['Role']['RoleName']\n",
    "print(iam_role_kinesis_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "account_id = sts.get_caller_identity()['Account']"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "kinesis_policy_doc = {\n",
    "    \"Version\": \"2012-10-17\",\n",
    "    \"Statement\": [\n",
    "         {\n",
    "            \"Effect\": \"Allow\",\n",
    "            \"Action\": [\n",
    "                \"firehose:DeleteDeliveryStream\",\n",
    "                \"firehose:PutRecord\",\n",
    "                \"firehose:PutRecordBatch\",\n",
    "                \"firehose:UpdateDestination\",\n",
    "            ],\n",
    "            \"Resource\": [\n",
    "                \"arn:aws:firehose:{}:{}:deliverystream/{}\".format(region, account_id, stream_name)\n",
    "            ]\n",
    "         },\n",
    "         {\n",
    "            \"Effect\": \"Allow\",\n",
    "            \"Action\": [\n",
    "                \"kinesis:Get*\",\n",
    "                \"kinesis:DescribeStream\",\n",
    "                \"kinesis:Put*\",\n",
    "                \"kinesis:List*\",                \n",
    "            ],\n",
    "            \"Resource\": [\n",
    "                \"arn:aws:kinesis:{}:{}:stream/{}\".format(region, account_id, stream_name)\n",
    "            ]\n",
    "         }\n",
    "        \n",
    "    ]\n",
    "}\n",
    "\n",
    "print(kinesis_policy_doc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Update Policy"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "\n",
    "response = iam.put_role_policy(\n",
    "    RoleName=iam_role_kinesis_name,\n",
    "    PolicyName='DSOAWS_KinesisPolicy',\n",
    "    PolicyDocument=json.dumps(kinesis_policy_doc)\n",
    ")\n",
    "\n",
    "time.sleep(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(response)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create a Kinesis Firehose Stream with Source Data Stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "firehose_name = 'dsoaws-firehose-stream'\n",
    "delivery_stream_type = 'KinesisStreamAsSource'\n",
    "bucket_arn = 'arn:aws:s3:::dsoaws-streaming-data'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "firehose_response = kn_firehose.create_delivery_stream(\n",
    "    DeliveryStreamName=firehose_name,\n",
    "    DeliveryStreamType=delivery_stream_type,\n",
    "    KinesisStreamSourceConfiguration={\n",
    "        'KinesisStreamARN': data_stream_arn,\n",
    "        'RoleARN': iam_role_kinesis_arn\n",
    "    },\n",
    "    ExtendedS3DestinationConfiguration={\n",
    "        'RoleARN': iam_role_kinesis_arn,\n",
    "        'BucketARN': bucket_arn\n",
    "    }\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(firehose_response)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Put Records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 cp 's3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Software_v1_00.tsv.gz' ./data/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import csv\n",
    "import pandas as pd\n",
    "\n",
    "df = pd.read_csv('./data/amazon_reviews_us_Digital_Software_v1_00.tsv.gz', \n",
    "                 delimiter='\\t', \n",
    "                 quoting=csv.QUOTE_NONE,\n",
    "                 compression='gzip')\n",
    "df.shape"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.head(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "partition_key = '123'\n",
    "#reviews_tsv = '5\\tThis is a 5 star review\\n1\\tThis is a 1 star review\\n'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_star_rating_and_review_body = df[['star_rating', 'review_body']][:100]\n",
    "df_star_rating_and_review_body.shape"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_star_rating_and_review_body.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "reviews_tsv = df_star_rating_and_review_body.to_csv(sep='\\t',\n",
    "                                                    header=None,\n",
    "                                                    index=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "reviews_tsv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data_stream = boto3.Session().client(service_name='kinesis', region_name=region)\n",
    "\n",
    "response = data_stream.put_records(\n",
    "    Records=[\n",
    "        {\n",
    "            'Data': reviews_tsv.encode('utf-8'),\n",
    "            'PartitionKey': partition_key\n",
    "        },\n",
    "    ],\n",
    "    StreamName=stream_name\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Get Records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "shard_id_1 = 'shardId-000000000000'\n",
    "shard_id_2 = 'shardId-000000000001'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "shard_iter_1 = data_stream.get_shard_iterator(StreamName=stream_name, \n",
    "                                            ShardId=shard_id_1, \n",
    "                                            ShardIteratorType='TRIM_HORIZON')['ShardIterator']\n",
    "\n",
    "shard_iter_2 = data_stream.get_shard_iterator(StreamName=stream_name, \n",
    "                                            ShardId=shard_id_2, \n",
    "                                            ShardIteratorType='TRIM_HORIZON')['ShardIterator']"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "records_response_1 = data_stream.get_records(\n",
    "    ShardIterator=shard_iter_1,\n",
    "    Limit=100\n",
    ")\n",
    "print(records_response_1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(records_response_1['Records'][0]['Data'].decode('utf-8'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "records_response_2 = data_stream.get_records(\n",
    "    ShardIterator=shard_iter_2,\n",
    "    Limit=100\n",
    ")\n",
    "print(records_response_2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(records_response_2['Records'][0]['Data'].decode('utf-8'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
